---
layout: post
date: 2024-10-23
title: "TCP Server in Zig - Part 6 - Epoll"
description: "Using Linux's epoll to improve our async evented TCP server."
tags: [zig]
---

<p>In the last two parts we introduced the <code>poll</code> system call and, with it, patterns around evented I/O. <code>poll</code> has the advantage of being simple to use and supported on most platforms. However, as we saw, we need to iterate through all monitored sockets to figure out which is ready.</p>

<p>Another awkwardness with <code>poll</code>'s API is associating application-specific data with each <code>pollfd</code> being monitored. To know which <code>Client</code> is ready, we need to create a mapping between the <code>pollfd</code> and <code>Client</code>. While this isn't the end of the world, it is something that poll alternatives address.</p>

<p>The first alternative API that we'll look at is <code>epoll</code>. The bulk of what we've learnt about evented I/O is still applicable as-is. However, <code>epoll</code> is a Linux-specific system call. In order to support it as well as other platforms, we'll need to leverage Zig's comptime as well as some layer of abstraction. This is something we'll look at after exploring <code>kqueue</code>, a BSD/MacOS-specific system call.</p>

<h3 id=epoll><a href="#epoll" aria-hidden=true>epoll</a></h3>
<p>The <code>epoll</code> API is three separate system calls. <code>epoll_create1</code> is used to create an epoll file descriptor. You can think of this as our epoll instance. <code>epoll_ctl</code> is used to modify our instance. With it, we can add, modify and remove sockets that we want our instance to monitor. Finally, we have <code>epoll_wait</code> which blocks until one of the monitored sockets is ready or until the configured timeout is reached.</p>

<p>With <code>poll</code> we specify the sockets to monitor on the blocking call to <code>poll</code> itself. This means we need to maintain our own array of <code>pollfd</code>. With <code>epoll</code>, because we have distinct functions to modify and wait, this ugliness is gone.</p>

<p>Here's a bare-bonned runnable (on Linux) example:</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    // epoll_create1 takes flags. We aren't using any in these examples
    const efd = try posix.epoll_create1(0);
    defer posix.close(efd);

    {
        // monitor our listening socket
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    var ready_list: [128]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket = ready.data.fd;
            if (ready_socket == listener) {
                const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                errdefer posix.close(client_socket);
                var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = client_socket}};
                try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
            } else {
                var closed = false;
                var buf: [4096]u8 = undefined;
                const read = posix.read(ready_socket, &buf) catch 0;
                if (read == 0) {
                    closed = true;
                } else {
                    std.debug.print("[{d}] got: {any}\n", .{ready_socket, buf[0..read]});
                }

                if (closed or ready.events & linux.EPOLL.RDHUP == linux.EPOLL.RDHUP) {
                    posix.close(ready_socket);
                }
            }
        }
    }
}
{% endhighlight %}

<aside><p>If you're going to run this in Docker, you might need to change the address the listener is bound to from <code>127.0.0.1</code> to <code>0.0.0.0</code>.</p></aside>

<p>There's a number of interesting things in the above code, especially compared to <a href="/TCP-Server-In-Zig-Part-5a-Poll/#poll">our first runnable code that used <code>poll</code></a>. The list of <code>epoll_event</code>'s that we pass into the function is going to be filled by <code>epoll_wait</code> itself. The return value tells us how many events were filled. So instead of having to loop through every monitored event, seeing which is ready, we're given a list of ready sockets, i.e. <code>ready_list[0..ready_count]</code>. It's significantly more efficient.</p>

<p>Another change is how much simpler it is to remove a connection. We can explicitly remove a monitor via <code>epoll_ctl</code> and the <code>CTL_DEL</code> operation. But just closing the socket is enough to remove it.</p>

<p>From the above, we can tell that <code>epoll_ctl</code> takes the <code>epoll</code> file descriptor created via <code>epoll_create1</code>, the operation (<code>CTL_ADD</code>, <code>CTL_DEL</code> or <code>CTL_MOD</code>), the file descriptor to monitor (sockets in our case) and an <code>epoll_event</code>. The <code>epoll_event</code> structure has two fields, the <code>events</code> which is like a <code>pollfd.events</code> and <code>data</code>. The <code>data</code> field is a untagged union and allows us to associate arbitrary information with an <code>epoll_event</code>. The value we store in <code>data</code> can be retrieved when iterating through the ready list. Above, we store the socket and retrieve the socket from <code>data.fd</code>. Storing the socket directly in <code>data</code> is a good start, but we can do more with this field.</p>

<h3 id=epoll.data.ptr><a href="#epoll.data.ptr" aria-hidden=true>epoll.data.ptr</a></h3>
<p>One of the challenge we faced with <code>poll</code> was to associate an <code>pollfd</code> with its corresponding client. We managed to get this working by keeping two slices in sync, but never bothered figuring out how to get a pollfd from a client. With <code>epoll</code> this is much simpler. Specifically, the <code>epoll_event.data</code> field lets us store the numeric representation of a pointer. Or, put differently, we can associate anything we want with an <code>epoll_event</code>.</p>

<p>The benefit of this might not be immediately obvious, but let's add a simple <code>Client</code> and use <code>data</code> to associate our <code>epoll_event</code> directly with a client:</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

// Added this simple client
const Client = struct {
    socket: posix.socket_t,
};

pub fn main() !void {
    // We now need an allocator,
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    const address = try std.net.Address.parseIp("0.0.0.0", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    const efd = try posix.epoll_create1(0);

    {
        // monitor our listening socket
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.ptr = 0}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    var ready_list: [128]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            // we're using ptr instead of fd
            switch (ready.data.ptr) {
                0 => {
                    const socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                    errdefer posix.close(socket);

                    // We create a client
                    const client = try allocator.create(Client);
                    errdefer allocator.destroy(client);
                    client.* = .{.socket = socket};

                    var event = linux.epoll_event{
                        .events = linux.EPOLL.IN,
                        // instead of setting fd, we set ptr which is a usize
                        .data = .{.ptr = @intFromPtr(client)}
                    };

                    try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, socket, &event);
                },
                else => |nptr| {
                    var closed = false;
                    var buf: [4096]u8 = undefined;

                    // we can get the client from data.ptr by reversing
                    // @intFromPtr with a call to @ptrFromInt
                    const client: *Client = @ptrFromInt(nptr);

                    // We should use client.readMessage() when we add our Reader
                    // back into our Client.
                    const read = posix.read(client.socket, &buf) catch 0;
                    if (read == 0) {
                        closed = true;
                    } else {
                        std.debug.print("[{d}] got: {any}\n", .{client.socket, buf[0..read]});
                    }

                    if (closed or ready.events & linux.EPOLL.RDHUP == linux.EPOLL.RDHUP) {
                        posix.close(client.socket);
                        allocator.destroy(client);
                    }
                },
            }
        }
    }
}
{% endhighlight %}

<p>The <code>data.ptr</code> is used to store the usize-representation of our client pointer. We use Zig's built-in <code>@intFromPtr</code> to turn a pointer into an integer and then <code>@ptrFromInt</code> to reverse the process. A real hack in the above code is setting the <code>data.ptr</code> for our listening socket to <code>0</code>. Remember that <code>data</code> is an untagged union, if we use <code>data.fd = listener</code> for our listening socket, but then <code>data.ptr = @intFormPtr(client)</code> for client connections, we won't be able to tell which tag is active. The "right" way to fix this is to create our own tagged union:</p>

{% highlight zig %}
const EventType = union(enum) {
    listener: posix.socket_t,
    recv: *Client,
};
{% endhighlight %}

<p>But that's another layer of allocations and indirection. Instead, we use <code>0</code> and hope that the operating system never allocate a Client at memory address 0 (this is more than "hope", and aside from embedded systems, most platforms would never allocate memory at address 0).</p>

<p><code>@intFromPtr</code> is a built-in function which takes a pointer and gives you the <code>usize</code> value. Now, when a socket is ready, we can get the corresponding client by using the reverse function, <code>@ptrFromInt</code> on the <code>data.ptr</code> field to get the client:</p>

{% highlight zig %}
const client: Client = @ptrFromInt(nptr);
{% endhighlight %}

<p>It might be obvious to some, but for this to work, a <code>client</code> <strong>has</strong> to be allocated on the heap. While we can call <code>@intToPtr</code> on a stack-allocated value, that value won't magically remain valid beyond its scope.</p>

<h3 id=client-to-epoll><a href="#client-to-epoll" aria-hidden=true>Client to epoll_event</a></h3>
<p>Being able to associate arbitrary data with an <code>epoll_event</code> tightens up our code, but the real benefit is the ease with which we can modify the epoll instance from a client. The way we previously structured our <code>poll</code>-based server, it wasn't possible for <code>writeMessage</code> to be called externally (i.e. from application code) because the <code>Client</code> had no way to alter the server's <code>polls</code> list. We <em>could</em> have had each <code>Client</code> reference a <code>*Server</code>, but it would have been tricky to deal with the moving indexes.</p>

<p>With <code>epoll</code> we still need each <code>Client</code> to have the epoll instance (either directly, or via a reference to the <code>*Server</code>), but can use <code>epoll_ctl</code> to modify the monitor for the client. You'll recall that previously our <code>write</code> function would return a <code>false</code> to signal an incomplete write and <code>true</code> to signal a completed write. This value was used by our <code>run</code> function to switch to and from write-mode and read-mode. We can now bake this into <code>write</code>:</p>

{% highlight zig %}
fn write(self: *Client) !void {
    var buf = self.to_write;
    defer self.to_write = buf;
    while (buf.len > 0) {
        const n = posix.write(self.socket, buf) catch |err| switch (err) {
            error.WouldBlock => {
                // switch to write-mode
                var event = linux.epoll_event{
                    .events = linux.EPOLL.OUT,
                    .data = .{.ptr = @intFromPtr(self)},
                };
                return posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, self.socket, &event);
            }
            else => return err,
        };

        if (n == 0) {
            return error.Closed;
        }
        buf = buf[n..];
    } else {
        // done writing, switch to read-mode
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN,
            .data = .{.ptr = @intFromPtr(self)},
        };
        return posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, self.socket, &event);
    }
}
{% endhighlight %}

<p>That might seem like a minor change, but it enables something you'll almost certainly need: the ability to initiate a write from the application. There is one inefficiency in the above code: on a complete write, we revert to read-mode. However, most of the time, we'll probably never have gone into write-mode to begin with. As we've said before, unless your server is very busy, your client is lagging, or you're sending very large messages, there's a reasonable chance that <code>posix.write</code> will succeed in a single call. In such cases, reverting to read-mode is an unnecessary and wasteful system call. I'll leave this up to you to fix, but one solution would be to add an <code>IOMode</code> enum with two values: <code>read</code> and <code>write</code> and track which the client is currently in. Then we can check <code>if self.mode == .write</code> before switching to read-mode.</p>

<h3 id=edge_triggered><a href="#edge_triggered" aria-hidden=true>Edge-Triggered</a></h3>
<p>We mentioned that <code>poll</code> is always level-triggered. That is, as long as a condition holds, we'll get a notification. For example, if one call to <code>poll</code> notifies us that a certain socket is ready and we then call<code>poll</code> again, without reading the socket, we'll get another notification. This is also how <code>epoll</code> works by default. But we can make <code>epoll</code> edge-triggered on a per-socket basis.</p>

<p>Let's go back to our first <code>epoll</code> program, and change the code that handles a client's readiness:</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    // epoll_create1 takes flags. We aren't using any in these examples
    const efd = try posix.epoll_create1(0);

    {
        // monitor our listening socket
        var event = linux.epoll_event{.events = linux.EPOLL.IN, .data = .{.fd = listener}};
        try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    var ready_list: [128]linux.epoll_event = undefined;
    while (true) {
        const ready_count = posix.epoll_wait(efd, &ready_list, -1);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket = ready.data.fd;
            if (ready_socket == listener) {
                const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                errdefer posix.close(client_socket);
                var event = linux.epoll_event{
                    .events = linux.EPOLL.IN,
                    .data = .{.fd = client_socket}
                };
                try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
            } else {
                // THIS HAS CHANGED
                std.debug.print(".", .{});
            }
        }
    }
}
{% endhighlight %}

<p>For now, only the end of the code has changed. Instead of trying to read the message and echo'ing it back, we write a dot <code>.</code>. If you run this code (again, Linux only), connect and send a message, your screen should get filled with dots. This is because <code>epoll</code> is, by default, in level-triggered mode and continues to notify us of the socket's readiness.</p>

<p>When adding the monitor for the client socket, if we change <code>events</code> from <code>linux.EPOLL.IN</code> to <code>linux.EPOLL.IN | linux.EPOLL.ET</code>, i.e.:</p>

{% highlight zig %}
var event = linux.epoll_event{
    // THIS IS CHANGED
    .events = linux.EPOLL.IN | linux.EPOLL.ET,
    .data = .{.fd = client_socket}
};
try posix.epoll_ctl(efd, linux.EPOLL.CTL_ADD, client_socket, &event);
{% endhighlight %}

<p>we make the monitor edge-triggered (ET). Now, if we connect a client and send a message, we'll get a single notification. The monitor isn't deleted or even disabled. If we send another message, we'll get a new notification. <code>epoll</code> is pretty smart too, a new event doesn't have to happen while <code>epoll_wait</code> is blocked. So there is no race condition before setting a monitor and calling <code>epoll_wait</code>. In our above code, if the client sends a message right after the socket is added with <code>epoll_ctl</code> but before <code>epoll_wait</code> is called, we <em>will</em> get the notification.</p>

<p>We can also use <code>linux.EPOLL.ONESHOT</code> which disables the notification for the socket after readiness has been signaled once, including for any new messages. <code>epoll_ctl</code> with the <code>linux.EPOLL.CTL_MOD</code> operation needs to be called to re-enable the monitor. This is obviously different than using edge-triggered since notifications are disabled even for new events. While <code>ONESHOT</code> might be useful to make sure that a single message per client is processed, it does have the disadvantage of requiring many more system calls - one every time we want to re-enable to monitor.</p>

<h3 id=conclusion><a href="#conclusion" aria-hidden=true>Conclusion</a></h3>
<p>Besides the fact that <code>epoll</code> is Linux-specific, it's safe to think of it as a better version of <code>poll</code>. It's more efficient, easier to integrate and has more features. Except for the specifics of the API, everything about evented I/O is unchanged. Using <code>@intFromPtr</code> and <code>@ptrFromInt</code> can actually be useful now and again, but it's mostly used as above, when integrating with a C library that lets you associate arbitrary data (often a <code>usize</code>) with a C structure.</p>

<p>Below, you'll find a full working (in Linux) example. It merges what we've learned about <code>epoll</code> with our <code>Server</code>, <code>Client</code> and <code>Reader</code> from previous parts. We've also introduced an <code>Epoll</code> structure. The goal is to start creating some abstraction to eventually deal with having platform-specific implementations (i.e. using epoll on Linux and kqueue on BSD/MacOS). This abstration is incomplete, but it's a start and something we'll dedicate a whole part to.</p>

<div class=pager>
  <a class=prev href=/TCP-Server-In-Zig-Part-5b-Poll/>Poll (Part 2)</a>
  <a class=next href=/TCP-Server-In-Zig-Part-7-Kqueue/>Kqueue</a>
</div>

<h3 id=appendix-a><a href="#appendix-a" aria-hidden=true>Appendix A - Code</a></h3>
<aside><p>This example will only run on Linux. If you try to run it in docker, you might need to change the listening address from <code>127.0.0.1</code> to <code>0.0.0.0</code>.</p></aside>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;
const Allocator = std.mem.Allocator;

const log = std.log.scoped(.tcp_demo);

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var server = try Server.init(allocator, 4096);
    defer server.deinit();

    const address = try std.net.Address.parseIp("127.0.0.1", 5882);
    try server.run(address);
    std.debug.print("STOPPED\n", .{});
}

// 1 minute
const READ_TIMEOUT_MS = 60_000;

const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;

const Server = struct {
    // maximum # of allowed clients
    max: usize,

    loop: Epoll,

    // creates our polls and clients slices and is passed to Client.init
    // for it to create our read buffer.
    allocator: Allocator,

    // The number of clients we currently have connected
    connected: usize,

    read_timeout_list: ClientList,

    // for creating client
    client_pool: std.heap.MemoryPool(Client),
    // for creating nodes for our read_timeout list
    client_node_pool: std.heap.MemoryPool(ClientList.Node),

    fn init(allocator: Allocator, max: usize) !Server {
        const loop = try Epoll.init();
        errdefer loop.deinit();

        const clients = try allocator.alloc(*Client, max);
        errdefer allocator.free(clients);

        return .{
            .max = max,
            .loop = loop,
            .connected = 0,
            .allocator = allocator,
            .read_timeout_list = .{},
            .client_pool = std.heap.MemoryPool(Client).init(allocator),
            .client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
        };
    }

    fn deinit(self: *Server) void {
        self.loop.deinit();
        self.client_pool.deinit();
        self.client_node_pool.deinit();
    }

    fn run(self: *Server, address: std.net.Address) !void {
        const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
        const protocol = posix.IPPROTO.TCP;
        const listener = try posix.socket(address.any.family, tpe, protocol);
        defer posix.close(listener);

        try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
        try posix.bind(listener, &address.any, address.getOsSockLen());
        try posix.listen(listener, 128);
        var read_timeout_list = &self.read_timeout_list;

        try self.loop.addListener(listener);

        while (true) {
            const next_timeout = self.enforceTimeout();
            const ready_events = self.loop.wait(next_timeout);
            for (ready_events) |ready| {
                switch (ready.data.ptr) {
                    0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
                    else => |nptr| {
                        const events = ready.events;
                        const client: *Client = @ptrFromInt(nptr);

                        if (events & linux.EPOLL.IN == linux.EPOLL.IN) {
                            // this socket is ready to be read
                            while (true) {
                                const msg = client.readMessage() catch {
                                    self.closeClient(client);
                                    break;
                                } orelse break;   // no more messages

                                client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
                                read_timeout_list.remove(client.read_timeout_node);
                                read_timeout_list.append(client.read_timeout_node);

                                client.writeMessage(msg) catch {
                                    self.closeClient(client);
                                    break;
                                };
                            }
                        } else if (events & linux.EPOLL.OUT == linux.EPOLL.OUT) {
                            client.write() catch self.closeClient(client);
                        }
                    }
                }
            }
        }
    }

    fn enforceTimeout(self: *Server) i32 {
        const now = std.time.milliTimestamp();
        var node = self.read_timeout_list.first;
        while (node) |n| {
            const client = n.data;
            const diff = client.read_timeout - now;
            if (diff > 0) {
                // this client's timeout is the first one that's in the
                // future, so we now know the maximum time we can block on
                // poll before having to call enforceTimeout again
                return @intCast(diff);
            }

            // This client's timeout is in the past. Close the socket
            // Ideally, we'd call server.removeClient() and just remove the
            // client directly. But within this method, we don't know the
            // client_polls index. When we move to epoll / kqueue, this problem
            // will go away, since we won't need to maintain polls and client_polls
            // in sync by index.
            posix.shutdown(client.socket, .recv) catch {};
            node = n.next;
        } else {
            // We have no client that times out in the future (if we did
            // we would have hit the return above).
            return -1;
        }
    }

    fn accept(self: *Server, listener: posix.socket_t) !void {
        const space = self.max - self.connected;
        for (0..space) |_| {
            var address: net.Address = undefined;
            var address_len: posix.socklen_t = @sizeOf(net.Address);
            const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
                error.WouldBlock => return,
                else => return err,
            };

            const client = try self.client_pool.create();
            errdefer self.client_pool.destroy(client);
            client.* = Client.init(self.allocator, socket, address, &self.loop) catch |err| {
                posix.close(socket);
                log.err("failed to initialize client: {}", .{err});
                return;
            };
            errdefer client.deinit(self.allocator);

            client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
            client.read_timeout_node = try self.client_node_pool.create();
            errdefer self.client_node_pool.destroy(client.read_timeout_node);

            client.read_timeout_node.* = .{
                .next = null,
                .prev = null,
                .data = client,
            };
            self.read_timeout_list.append(client.read_timeout_node);
            try self.loop.newClient(client);
            self.connected += 1;
        } else {
            // we've run out of space, stop monitoring the listening socket
            try self.loop.removeListener(listener);
        }
    }

    fn closeClient(self: *Server, client: *Client) void {
        self.read_timeout_list.remove(client.read_timeout_node);

        posix.close(client.socket);
        self.client_node_pool.destroy(client.read_timeout_node);
        client.deinit(self.allocator);
        self.client_pool.destroy(client);
    }
};

const Client = struct {
    // This probably shouldn't be a pointer. Epoll is currently lightweight
    // and is safe to copy. But, in Part 8, when we build an abstraction
    // for Epoll and Kqueue, this will be necessary.
    loop: *Epoll,

    socket: posix.socket_t,
    address: std.net.Address,

    // Used to read length-prefixed messages
    reader: Reader,

    // Bytes we still need to send. This is a slice of `write_buf`. When
    // empty, then we're in "read-mode" and are waiting for a message from the
    // client.
    to_write: []u8,

    // Buffer for storing our lenght-prefixed messaged
    write_buf: []u8,

    // absolute time, in millisecond, when this client should timeout if
    // a message isn't received
    read_timeout: i64,

    // Node containing this client in the server's read_timeout_list
    read_timeout_node: *ClientNode,

    fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address, loop: *Epoll) !Client {
        const reader = try Reader.init(allocator, 4096);
        errdefer reader.deinit(allocator);

        const write_buf = try allocator.alloc(u8, 4096);
        errdefer allocator.free(write_buf);

        return .{
            .loop = loop,
            .reader = reader,
            .socket = socket,
            .address = address,
            .to_write = &.{},
            .write_buf = write_buf,
            .read_timeout = 0, // let the server set this
            .read_timeout_node = undefined, // hack/ugly, let the server set this when init returns
        };
    }

    fn deinit(self: *const Client, allocator: Allocator) void {
        self.reader.deinit(allocator);
        allocator.free(self.write_buf);
    }

    fn readMessage(self: *Client) !?[]const u8 {
        return self.reader.readMessage(self.socket) catch |err| switch (err) {
            error.WouldBlock => return null,
            else => return err,
        };
    }

    fn writeMessage(self: *Client, msg: []const u8) !void {
        if (self.to_write.len > 0) {
            // Depending on how you structure your code, this might not be possible
            // For example, in an HTTP server, the application might not control
            // the actual "writeMessage" call, and thus it would not be possible
            // to make more than one writeMessage call per request.
            // For this demo, we'll just return an error.
            return error.PendingMessage;
        }

        if (msg.len + 4 > self.write_buf.len) {
            // Could allocate a dynamic buffer. Could use a large buffer pool.
            return error.MessageTooLarge;
        }

        // copy our length prefix + message to our buffer
        std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
        const end = msg.len + 4;
        @memcpy(self.write_buf[4..end], msg);

        // setup our to_write slice
        self.to_write = self.write_buf[0..end];

        // immediately write what we can
        return self.write();
    }

    // Returns `false` if we didn't manage to write the whole mssage
    // Returns `true` if the message is fully written
    fn write(self: *Client) !void {
        var buf = self.to_write;
        defer self.to_write = buf;
        while (buf.len > 0) {
            const n = posix.write(self.socket, buf) catch |err| switch (err) {
                error.WouldBlock => return self.loop.writeMode(self),
                else => return err,
            };

            if (n == 0) {
                return error.Closed;
            }
            buf = buf[n..];
        } else {
            return self.loop.readMode(self);
        }
    }
};

const Reader = struct {
    buf: []u8,
    pos: usize = 0,
    start: usize = 0,

    fn init(allocator: Allocator, size: usize) !Reader {
        const buf = try allocator.alloc(u8, size);
        return .{
            .pos = 0,
            .start = 0,
            .buf = buf,
        };
    }

    fn deinit(self: *const Reader, allocator: Allocator) void {
        allocator.free(self.buf);
    }

    fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
        var buf = self.buf;

        while (true) {
            if (try self.bufferedMessage()) |msg| {
                return msg;
            }
            const pos = self.pos;
            const n = try posix.read(socket, buf[pos..]);
            if (n == 0) {
                return error.Closed;
            }
            self.pos = pos + n;
        }
    }

    fn bufferedMessage(self: *Reader) !?[]u8 {
        const buf = self.buf;
        const pos = self.pos;
        const start = self.start;

        std.debug.assert(pos >= start);
        const unprocessed = buf[start..pos];
        if (unprocessed.len < 4) {
            self.ensureSpace(4 - unprocessed.len) catch unreachable;
            return null;
        }

        const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);

        // the length of our message + the length of our prefix
        const total_len = message_len + 4;

        if (unprocessed.len < total_len) {
            try self.ensureSpace(total_len);
            return null;
        }

        self.start += total_len;
        return unprocessed[4..total_len];
    }

    fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
        const buf = self.buf;
        if (buf.len < space) {
            return error.BufferTooSmall;
        }

        const start = self.start;
        const spare = buf.len - start;
        if (spare >= space) {
            return;
        }

        const unprocessed = buf[start..self.pos];
        std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
        self.start = 0;
        self.pos = unprocessed.len;
    }
};

// We'll eventually need to build a platform abstractions between epoll and
// kqueue. This is a rough start.
const Epoll = struct {
    efd: posix.fd_t,
    ready_list: [128]linux.epoll_event = undefined,

    fn init() !Epoll {
        const efd = try posix.epoll_create1(0);
        return .{.efd = efd};
    }

    fn deinit(self: Epoll) void {
        posix.close(self.efd);
    }

    fn wait(self: *Epoll, timeout: i32) []linux.epoll_event {
        const count = posix.epoll_wait(self.efd, &self.ready_list, timeout);
        return self.ready_list[0..count];
    }

    fn addListener(self: Epoll, listener: posix.socket_t) !void {
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN,
            .data = .{.ptr = 0},
        };
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_ADD, listener, &event);
    }

    fn removeListener(self: Epoll, listener: posix.socket_t) !void {
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_DEL, listener, null);
    }

    fn newClient(self: Epoll, client: *Client) !void {
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN,
            .data = .{.ptr = @intFromPtr(client)},
        };
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_ADD, client.socket, &event);
    }

    fn readMode(self: Epoll, client: *Client) !void {
        var event = linux.epoll_event{
            .events = linux.EPOLL.IN,
            .data = .{.ptr = @intFromPtr(client)},
        };
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, client.socket, &event);
    }

    fn writeMode(self: Epoll, client: *Client) !void {
        var event = linux.epoll_event{
            .events = linux.EPOLL.OUT,
            .data = .{.ptr = @intFromPtr(client)},
        };
        try posix.epoll_ctl(self.efd, linux.EPOLL.CTL_MOD, client.socket, &event);
    }
};
{% endhighlight %}
<div class=pager>
  <a class=prev href=/TCP-Server-In-Zig-Part-5b-Poll/>Poll (Part 2)</a>
  <a class=next href=/TCP-Server-In-Zig-Part-7-Kqueue/>Kqueue</a>
</div>
